package com.wugui.datax.rpc.remoting.provider;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.wugui.datax.rpc.registry.ServiceRegistry;
import com.wugui.datax.rpc.remoting.net.Server;
import com.wugui.datax.rpc.remoting.net.impl.netty.server.NettyServer;
import com.wugui.datax.rpc.remoting.net.params.BaseCallback;
import com.wugui.datax.rpc.remoting.net.params.XxlRpcRequest;
import com.wugui.datax.rpc.remoting.net.params.XxlRpcResponse;
import com.wugui.datax.rpc.serialize.Serializer;
import com.wugui.datax.rpc.serialize.impl.HessianSerializer;
import com.wugui.datax.rpc.util.IpUtil;
import com.wugui.datax.rpc.util.NetUtil;
import com.wugui.datax.rpc.util.ThrowableUtil;
import com.wugui.datax.rpc.util.XxlRpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

/**
 * provider
 *
 * @author xuxueli 2015-10-31 22:54:27
 */
public class XxlRpcProviderFactory {
  private static final Logger logger = LoggerFactory.getLogger(XxlRpcProviderFactory.class);

  // ---------------------- config ----------------------
  /** init local rpc service map */
  private final Map<String, Object> serviceData = new HashMap<String, Object>();

  private Class<? extends Server> server = NettyServer.class;
  private Class<? extends Serializer> serializer = HessianSerializer.class;
  private int corePoolSize = 60;
  private int maxPoolSize = 300;
  private String ip = null; // for registry
  private int port = 7080; // default port
  private String accessToken = null;
  private Class<? extends ServiceRegistry> serviceRegistry = null;
  private Map<String, String> serviceRegistryParam = null;
  private Server serverInstance;
  private Serializer serializerInstance;
  private ServiceRegistry serviceRegistryInstance;
  private String serviceAddress;

  /**
   * make service key
   *
   * @param iface
   * @param version
   * @return
   */
  public static String makeServiceKey(String iface, String version) {
    String serviceKey = iface;
    if (StrUtil.isNotBlank(version)) {
      serviceKey += "#".concat(version);
    }
    return serviceKey;
  }

  // set
  public void setServer(Class<? extends Server> server) {
    this.server = server;
  }

  public void setSerializer(Class<? extends Serializer> serializer) {
    this.serializer = serializer;
  }

  public void setIp(String ip) {
    this.ip = ip;
  }

  public void setAccessToken(String accessToken) {
    this.accessToken = accessToken;
  }

  public void setServiceRegistry(Class<? extends ServiceRegistry> serviceRegistry) {
    this.serviceRegistry = serviceRegistry;
  }

  public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
    this.serviceRegistryParam = serviceRegistryParam;
  }

  // get
  public Serializer getSerializerInstance() {
    return serializerInstance;
  }

  // ---------------------- start / stop ----------------------

  public int getPort() {
    return port;
  }

  public void setPort(int port) {
    this.port = port;
  }

  public int getCorePoolSize() {
    return corePoolSize;
  }

  public void setCorePoolSize(int corePoolSize) {
    this.corePoolSize = corePoolSize;
  }

  public int getMaxPoolSize() {
    return maxPoolSize;
  }

  public void setMaxPoolSize(int maxPoolSize) {
    this.maxPoolSize = maxPoolSize;
  }

  // ---------------------- server invoke ----------------------

  public void start() throws Exception {

    // valid
    if (this.server == null) {
      throw new XxlRpcException("xxl-rpc provider server missing.");
    }
    if (this.serializer == null) {
      throw new XxlRpcException("xxl-rpc provider serializer missing.");
    }
    if (!(this.corePoolSize > 0 && this.maxPoolSize > 0 && this.maxPoolSize >= this.corePoolSize)) {
      this.corePoolSize = 60;
      this.maxPoolSize = 300;
    }
    if (this.ip == null) {
      this.ip = IpUtil.getIp();
    }
    if (this.port <= 0) {
      this.port = 7080;
    }
    if (NetUtil.isPortUsed(this.port)) {
      throw new XxlRpcException("xxl-rpc provider port[" + this.port + "] is used.");
    }

    // init serializerInstance
    this.serializerInstance = serializer.newInstance();

    // start server
    serviceAddress = IpUtil.getIpPort(this.ip, port);
    serverInstance = server.newInstance();
    serverInstance.setStartedCallback(
        new BaseCallback() { // serviceRegistry started
          @Override
          public void run() throws Exception {
            // start registry
            if (serviceRegistry != null) {
              serviceRegistryInstance = serviceRegistry.newInstance();
              serviceRegistryInstance.start(serviceRegistryParam);
              if (CollUtil.isNotEmpty(serviceData)) {
                serviceRegistryInstance.registry(serviceData.keySet(), serviceAddress);
              }
            }
          }
        });
    serverInstance.setStoppedCallback(
        new BaseCallback() { // serviceRegistry stoped
          @Override
          public void run() {
            // stop registry
            if (serviceRegistryInstance != null) {
              if (CollUtil.isNotEmpty(serviceData)) {
                serviceRegistryInstance.remove(serviceData.keySet(), serviceAddress);
              }
              serviceRegistryInstance.stop();
              serviceRegistryInstance = null;
            }
          }
        });
    serverInstance.start(this);
  }

  public void stop() throws Exception {
    // stop server
    serverInstance.stop();
  }

  public Map<String, Object> getServiceData() {
    return serviceData;
  }

  /**
   * add service
   *
   * @param iface
   * @param version
   * @param serviceBean
   */
  public void addService(String iface, String version, Object serviceBean) {
    String serviceKey = makeServiceKey(iface, version);
    serviceData.put(serviceKey, serviceBean);

    logger.info(
        ">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}",
        serviceKey,
        serviceBean.getClass());
  }

  /**
   * invoke service
   *
   * @param xxlRpcRequest
   * @return
   */
  public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {

    //  make response
    XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
    xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

    // match service bean
    String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
    Object serviceBean = serviceData.get(serviceKey);

    // valid
    if (serviceBean == null) {
      xxlRpcResponse.setErrorMsg("The serviceKey[" + serviceKey + "] not found.");
      return xxlRpcResponse;
    }

    if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3 * 60 * 1000) {
      xxlRpcResponse.setErrorMsg(
          "The timestamp difference between admin and executor exceeds the limit.");
      return xxlRpcResponse;
    }
    if (StrUtil.isNotBlank(accessToken)
        && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
      xxlRpcResponse.setErrorMsg(
          "The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
      return xxlRpcResponse;
    }

    try {
      // invoke
      Class<?> serviceClass = serviceBean.getClass();
      String methodName = xxlRpcRequest.getMethodName();
      Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
      Object[] parameters = xxlRpcRequest.getParameters();

      Method method = serviceClass.getMethod(methodName, parameterTypes);
      method.setAccessible(true);
      Object result = method.invoke(serviceBean, parameters);

      /*FastClass serviceFastClass = FastClass.create(serviceClass);
      FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
      Object result = serviceFastMethod.invoke(serviceBean, parameters);*/

      xxlRpcResponse.setResult(result);
    } catch (Throwable t) {
      // catch error
      logger.error("xxl-rpc provider invokeService error.", t);
      xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
    }

    return xxlRpcResponse;
  }
}
